package com.atguigu.flink.checkpoint;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/15

    savepoint的设计场景是 Job需要升级，或调整并行度等，因此必须为算子提供详细的元数据信息。
        在编程时，需要手动为算子指定id。
 */
public class Demo2_Savepoint
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(5000);
        //ck备份设置
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ck1");
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

                env
                   .socketTextStream("hadoop102", 8888).uid("s")
                   .map(new MyMapFunction()).uid("map")
                    .filter(s -> s.contains("a")).uid("f")
                    .addSink(new SinkFunction<String>()
                    {
                        @Override
                        public void invoke(String value, Context context) throws Exception {
                            if (value.contains("x")){
                                //手动模拟故障
                                throw new RuntimeException("出故障了...");
                            }
                            System.out.println(value);
                        }
                    }).uid("sink");

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }

    private static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction
    {

        private ListState<String> result ;
        @Override
        public String map(String value) throws Exception {
            //写状态
            result.add(value);
            //读状态
            return result.get().toString();
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            OperatorStateStore operatorStateStore = context.getOperatorStateStore();
            ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("1", String.class);
            result = operatorStateStore.getListState(listStateDescriptor);
        }
    }
}
